查看原文
其他

深度学习算法(第34期)----强化学习之梯度策略实现

左右Shawn 智能算法 2021-09-10

上期我们一起学习了强化学习中OpenAI中平衡车的相关环境以及搭建神经网络策略的相关知识,
深度学习算法(第33期)----强化学习之神经网络策略学习平衡车
今天我们学习强化学习中行为评价和梯度策略的相关知识。

行为评价:信用分配问题

如果我们知道每一步的最佳动作,我们可以像通常一样通过最小化估计概率和目标概率之间的交叉熵来训练神经网络。那么这就成为了常见的监督学习。然而,在强化学习中,智能体获得指导的唯一途径是通过奖励,并且奖励通常是稀疏的和延迟的。例如,如果智能体在100个步骤内设法平衡杆,它怎么知道它采取的100个行动中的哪一个是好的,哪些是坏的?它所知道的只是在最后一次行动之后,杆子坠落了,但我们知道,这肯定不能完全怪最后一次行动。这被称为信用分配问题:当智能体得到奖励时,很难知道哪些行为应该被信任(或责备)。想想一只小狗在行为良好后几小时就会得到奖励,那么它会想明白为什么会得到奖励吗?

为了解决这个问题,一个通常的策略就是基于这个动作后的总得分来评估这个动作,通常在每步中应用衰减率r。如下图:
如果一个智能体决定连续三次向右,在第一步之后得到+10奖励,第二步后得到0,最后在第三步之后得到-50,然后假设我们使用衰减率r=0.8,那么第一个动作它将得到10 + r×0 + r^2 × (-50) = -22 的分数。如果衰减率接近0,那么与即时奖励相比,未来的奖励就不会有太大意义。相反,如果衰减率接近1,那么对未来的奖励几乎等于即时回报。典型的衰减率通常为是0.95或0.99。如果衰减率为0.95,那么未来13步的奖励大约是即时奖励的一半(0.95^13 ≈ 0.5),而当衰减率为0.99,未来69步的奖励是即时奖励的一半。在 CartPole 环境下,行为具有相当短期的影响,因此选择 0.95 的折扣率是合理的。

当然,一个好的动作可能会伴随着一些坏的动作,这些动作会导致平衡杆迅速下降,从而导致一个好的动作得到一个低分数(类似的,一个好演员有时会在一部烂片中扮演主角)。然而,如果我们花足够多的时间来训练游戏,平均下来好的行为会得到比坏的行为更高的分数。因此,为了获得相当可靠的动作分数,我们必须多次运行并将所有得分数归一化(通过减去平均值并除以标准偏差)。之后,我们可以合理地假设消极得分的行为是坏的,而积极得分的行为是好的。好了,到现在我们已经有了一个方法来评估每一个动作,我们已经准备好使用策略梯度来训练我们的第一个智能体。我们一起看看如何训练。

策略梯度

正如前面所讨论的,梯度策略算法通过沿着更高回报的梯度来优化策略参数。一种流行的策略梯度算法,称为增强算法,在 1929 由 Ronald Williams 提出。下面这是一个常见的变体:

  • 首先,让神经网络策略玩几次游戏,并在每一步计算梯度,这使得智能体更可能的去选择行动,但不应用这些梯度。

  • 运行几次后,使用前面描述的方法计算每个动作的得分。

  • 如果一个动作的分数是正的,这意味着动作是好的,可以应用较早计算的梯度,以便将来有更大的的概率选择这个动作。但是,如果分数是负的,这意味着动作是坏的,要应用相反的梯度来使得这个动作在将来采取的可能性更低。我们的方法就是简单地将每个梯度向量乘以相应的动作得分。

  • 最后,计算所有得到的梯度向量的平均值,并使用它来执行梯度下降步骤。

TensorFlow中怎么实现这个算法呢?我们将训练我们上一期建立的神经网络策略,
深度学习算法(第33期)----强化学习之神经网络策略学习平衡车
让它学会平衡车上的平衡杆。让我们从完成之前编码的构造阶段开始,添加目标概率、代价函数和训练操作。因为我们认为选择的动作便是最好的动作,所以如果选择的动作是动作 0(左),则目标概率必须为 1,如果选择动作 1(右)则目标概率为 0:

y = 1.0 - tf.to_float(action)

有了目标概率,我们就可以定义损失函数(交叉熵)并计算梯度了:

learning_rate = 0.01

cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)

optimizer = tf.train.AdamOptimizer(learning_rate)
grads_and_vars = optimizer.compute_gradients(cross_entropy)

需要注意的是,这里调用optimizer的compute_gradients()方法,而不是minimize()方法。这是因为我们想要在使用它们之前调整梯度,而compute_gradients()方法返回梯度向量/变量对的列表(每个可训练变量一对)。接下来我们把所有的梯度放在一个列表中,以便方便地获得它们的值:

gradients = [grad for grad, variable in grads_and_vars]

接下来将是重头戏。在执行阶段,算法将运行策略,并在每一步我们将评估这些梯度张量并存下来。在多次运行后,如刚学的一样,将去调整这些梯度(即,通过动作得分乘以梯度并使它们归一化),并计算调整后的梯度的平均值。接下来,需要将结果梯度反馈到优化器,以便它可以执行优化步骤。这意味着对于每一个梯度向量我们需要一个placeholder。

另外,我们还得创建操作去应用更新的梯度。为此,我们将调用优化器的apply_gradients()函数,该函数接受梯度向量/变量对的列表。我们不给它原始的梯度向量,而是给它一个包含更新梯度的列表(即,通过占位符递送的梯度),如下:

gradient_placeholders = []
grads_and_vars_feed = []

for grad, variable in grads_and_vars:
gradient_placeholder = tf.placeholder(tf.float32, shape=grad.get_shape())
gradient_placeholders.append(gradient_placeholder)
grads_and_vars_feed.append((gradient_placeholder, variable))

training_op = optimizer.apply_gradients(grads_and_vars_feed)

接下来我们完整的看一下构建过程:

n_inputs = 4
n_hidden = 4
n_outputs = 1
initializer = tf.contrib.layers.variance_scaling_initializer()

learning_rate = 0.01
X = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = fully_connected(X, n_hidden, activation_fn=tf.nn.elu,weights_initializer=initializer)
logits = fully_connected(hidden, n_outputs, activation_fn=None, weights_initializer=initializer)
outputs = tf.nn.sigmoid(logits)
p_left_and_right = tf.concat(axis=1, values=[outputs, 1 - outputs])
action = tf.multinomial(tf.log(p_left_and_right), num_samples=1)

y = 1.0 - tf.to_float(action)
cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits)
optimizer = tf.train.AdamOptimizer(learning_rate)
grads_and_vars = optimizer.compute_gradients(cross_entropy)
gradients = [grad for grad, variable in grads_and_vars]
gradient_placeholders = []
grads_and_vars_feed = []
for grad, variable in grads_and_vars:
gradient_placeholder = tf.placeholder(tf.float32, shape=grad.get_shape()) gradient_placeholders.append(gradient_placeholder)
grads_and_vars_feed.append((gradient_placeholder, variable))
training_op = optimizer.apply_gradients(grads_and_vars_feed)

init = tf.global_variables_initializer()
saver = tf.train.Saver()

在执行阶段,我们将需要两个函数来计算总的折扣奖励,给出原始奖励,以及归一化多次循环的结果:

def discount_rewards(rewards, discount_rate):
discounted_rewards = np.empty(len(rewards))
cumulative_rewards = 0
for step in reversed(range(len(rewards))):
cumulative_rewards = rewards[step] + cumulative_rewards * discount_rate discounted_rewards[step] = cumulative_rewards
return discounted_rewards

def discount_and_normalize_rewards(all_rewards, discount_rate):
all_discounted_rewards = [discount_rewards(rewards) for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean)/reward_std for discounted_rewards in all_discounted_rewards]

运行一下看看:

>>> discount_rewards([10, 0, -50], discount_rate=0.8)
array([-22., -40., -50.])
>>> discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_rate=0.8)
[array([-0.28435071, -0.86597718, -1.18910299]), array([ 1.26665318, 1.0727777 ])]

对discount_rewards()的调用正好返回我们所期望的(见上图)。我们也可以验证函数iscount_and_normalize_rewards()确实返回了两个步骤中每个动作的标准化分数。注意第一步比第二步差很多,所以它的归一化分数都是负的;从第一步开始的所有动作都会被认为是坏的,反之,第二步的所有动作都会被认为是好的。

好了,现在我们可以训练策略了:

n_iterations = 250 # 训练迭代次数
n_max_steps = 1000 # 每一次的最大步长
n_games_per_update = 10 # 每迭代十次训练一次策略网络
save_iterations = 10 # 每十次迭代保存模型
discount_rate = 0.95
with tf.Session() as sess:
init.run()
for iteration in range(n_iterations):
all_rewards = [] #每一次的所有奖励
all_gradients = [] #每一次的所有梯度
for game in range(n_games_per_update):
current_rewards = [] #当前步的所有奖励
current_gradients = [] #当前步的所有梯度
obs = env.reset()
for step in range(n_max_steps):
action_val, gradients_val = sess.run([action, gradients],
feed_dict={X: obs.reshape(1, n_inputs)}) # 一个obs
obs, reward, done, info = env.step(action_val[0][0]) current_rewards.append(reward)
current_gradients.append(gradients_val)
if done:
break
all_rewards.append(current_rewards)
all_gradients.append(current_gradients)
# 此时我们每10次运行一次策略,我们已经准备好使用之前描述的算法去更新策略,注:即使用迭代10次的结果来优化当前的策略。
all_rewards = discount_and_normalize_rewards(all_rewards)
feed_dict = {}
for var_index, grad_placeholder in enumerate(gradient_placeholders):
# 将梯度与行为分数相乘,并计算平均值
mean_gradients = np.mean([reward * all_gradients[game_index][step][var_index] for game_index, rewards in enumerate(all_rewards) for step, reward in enumerate(rewards)],axis=0)
feed_dict[grad_placeholder] = mean_gradients
sess.run(training_op, feed_dict=feed_dict)
if iteration % save_iterations == 0:
saver.save(sess, "./my_policy_net_pg.ckpt")

每一次训练迭代都是以运行10次的策略开始的(每次最多 1000 步,以避免永远运行)。在每一步,我们也计算梯度,假设选择的行动是最好的。在运行了这10次之后,我们使用discount_and_normalize_rewards()函数计算动作得分;我们遍历每个可训练变量,在所有次数和所有步骤中,通过其相应的动作分数来乘以每个梯度向量;并且我们计算结果的平均值。最后,我们运行训练操作,给它提供平均梯度(对每个可训练变量提供一个)。我们继续每10个训练次数保存一次模型。

这段代码将训练神经网络策略,它将成功地学会平衡车上的平衡杆(你可以在Juyter notebook上试用)。注意,实际上有两种方法可以让玩家游戏结束:要么平衡可以倾斜太大,要么车完全脱离屏幕。在250次训练迭代中,策略学会平衡极点,但在避免脱离屏幕方面还不够好。额外数百次的训练迭代可以解决这一问题。

尽管它相对简单,但是该算法是非常强大的。你可以用它来解决更难的问题,而不仅仅是平衡一辆手推车上的平衡杆。事实上,AlgPaGo 是基于类似的 策略梯度算法(加上蒙特卡罗树搜索等)。

至此,我们今天学习了梯度策略的相关知识,下期我们将一起学习下马尔科夫决策过程怎么在强化学习中大显身手。希望有些收获,欢迎留言或进社区共同交流,喜欢的话,就点个赞吧,您也可以置顶公众号,第一时间接收最新内容。


智能算法,与您携手,沉淀自己,引领AI!


: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存